Convolutional Neural Networks (CNNs) for Image Data (from scratch NumPy + PyTorch)#
CNNs are neural networks designed for grid-like data (images, audio spectrograms, some time-series). They work because they encode three key inductive biases:
Locality: patterns are local (edges, corners).
Translation structure: the same pattern can appear anywhere.
Weight sharing: we reuse the same filter across the whole image.
This notebook builds intuition with Plotly visuals, implements a tiny CNN from scratch in NumPy (including convolution backprop), then shows the equivalent workflow in PyTorch.
Learning goals#
By the end you should be able to:
explain convolution with channels, kernels, padding, stride
understand why convolutions use far fewer parameters than dense layers
implement and train a small CNN in pure NumPy (manual backprop)
train the same model in PyTorch and visualize results
Prerequisites#
backpropagation for MLPs
NumPy fundamentals
(optional) basic PyTorch
Notebook roadmap#
Data: load a small image dataset (no downloads)
Intuition: convolution and filters (Plotly heatmaps)
From scratch (NumPy): Conv2D + ReLU + MaxPool + Linear
Visual diagnostics (NumPy): filters, feature maps, confusion matrix
Practical (PyTorch): same model idea with autograd
Pitfalls + exercises
import time
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import os
import plotly.io as pio
from sklearn.datasets import load_digits
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.model_selection import train_test_split
try:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
TORCH_AVAILABLE = True
except Exception as e:
TORCH_AVAILABLE = False
_TORCH_IMPORT_ERROR = e
pio.templates.default = "plotly_white"
pio.renderers.default = os.environ.get("PLOTLY_RENDERER", "notebook")
np.set_printoptions(precision=4, suppress=True)
SEED = 42
rng = np.random.default_rng(SEED)
# --- Run configuration ---
FAST_RUN = True # set False for better accuracy / more epochs
EPOCHS_NUMPY = 10 if FAST_RUN else 40
EPOCHS_TORCH = 10 if FAST_RUN else 30
BATCH_SIZE = 64
LR_NUMPY = 0.05
MOMENTUM = 0.9
WEIGHT_DECAY = 1e-4
LR_TORCH = 1e-3
RUN_GRAD_CHECK = False # optional (slow)
1) Data: a tiny image dataset (8×8 handwritten digits)#
To keep this notebook offline-friendly, we use sklearn.datasets.load_digits():
1 channel (grayscale)
8×8 images
10 classes (0–9)
digits = load_digits()
X = digits.images.astype(np.float32) # (N, 8, 8)
y = digits.target.astype(np.int64)
# Normalize pixels roughly into [0, 1]
X = X / 16.0
# Add channel dimension: (N, C=1, H, W)
X = X[:, None, :, :]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.25, random_state=SEED, stratify=y
)
print('X_train', X_train.shape, 'X_test', X_test.shape)
X_train (1347, 1, 8, 8) X_test (450, 1, 8, 8)
# Visualize a grid of sample images
idx = rng.choice(len(X_train), size=36, replace=False)
imgs = X_train[idx, 0] # (36, 8, 8)
fig = px.imshow(
imgs,
facet_col=0,
facet_col_wrap=9,
color_continuous_scale='gray',
title='Sample training images (8×8)',
)
for a in fig.layout.annotations:
a.text = ''
fig.update_xaxes(showticklabels=False).update_yaxes(showticklabels=False)
fig.update_layout(coloraxis_showscale=False, margin=dict(l=10, r=10, t=60, b=10))
fig.show()
2) Convolution intuition (filters as pattern detectors)#
In deep learning, “convolution” is usually implemented as cross-correlation (we don’t flip the kernel):
[ y[n, c_{out}, i, j] = b[c_{out}] + \sum_{c_{in}} \sum_{u=0}^{kH-1} \sum_{v=0}^{kW-1} x[n, c_{in}, i+u, j+v], W[c_{out}, c_{in}, u, v] ]
Why this is powerful:
Local receptive fields: each output sees a small patch.
Weight sharing: the same kernel is used everywhere → far fewer parameters.
from plotly.subplots import make_subplots
def conv2d_single_channel_naive(img: np.ndarray, kernel: np.ndarray, padding: int = 0, stride: int = 1) -> np.ndarray:
img = np.asarray(img, dtype=np.float32)
kernel = np.asarray(kernel, dtype=np.float32)
H, W = img.shape
kH, kW = kernel.shape
if padding > 0:
img_p = np.pad(img, ((padding, padding), (padding, padding)), mode='constant')
else:
img_p = img
Hp, Wp = img_p.shape
out_h = (Hp - kH) // stride + 1
out_w = (Wp - kW) // stride + 1
out = np.zeros((out_h, out_w), dtype=np.float32)
for i in range(out_h):
for j in range(out_w):
patch = img_p[i*stride:i*stride+kH, j*stride:j*stride+kW]
out[i, j] = float(np.sum(patch * kernel))
return out
# Pick one image
img = X_train[0, 0]
# Two classic edge kernels
kx = np.array([[1, 0, -1], [2, 0, -2], [1, 0, -1]], dtype=np.float32)
ky = np.array([[1, 2, 1], [0, 0, 0], [-1, -2, -1]], dtype=np.float32)
fx = conv2d_single_channel_naive(img, kx, padding=1)
fy = conv2d_single_channel_naive(img, ky, padding=1)
fig = make_subplots(rows=1, cols=3, subplot_titles=['Input', 'Horizontal edges', 'Vertical edges'])
fig.add_trace(go.Heatmap(z=img, colorscale='gray', showscale=False), row=1, col=1)
fig.add_trace(go.Heatmap(z=fx, colorscale='RdBu', zmid=0, showscale=False), row=1, col=2)
fig.add_trace(go.Heatmap(z=fy, colorscale='RdBu', zmid=0, showscale=False), row=1, col=3)
fig.update_layout(title='A toy convolution demo (naive implementation)', height=320, margin=dict(l=10, r=10, t=60, b=10))
fig.show()
# Parameter count: dense vs conv
# Suppose we map an 8×8 image (64 pixels) to 8 feature maps (still 8×8).
# - A dense layer would connect every input pixel to every output pixel.
# - A conv layer uses a small kernel shared across space.
def n_params_dense(fan_in: int, fan_out: int) -> int:
return fan_in * fan_out + fan_out
def n_params_conv2d(c_in: int, c_out: int, k: int) -> int:
return c_out * (c_in * k * k) + c_out
dense = n_params_dense(64, 64 * 8)
conv = n_params_conv2d(c_in=1, c_out=8, k=3)
_df = pd.DataFrame(
[
{'layer': 'Dense (64 → 512)', 'params': dense},
{'layer': 'Conv2D (1→8, 3×3)', 'params': conv},
]
)
fig = px.bar(
_df,
x='layer',
y='params',
text='params',
title='Parameter count: dense vs convolution (weight sharing)',
)
fig.update_traces(textposition='outside')
fig.update_layout(yaxis_title='number of parameters')
fig.show()
3) From scratch in NumPy: a tiny CNN#
We implement a small CNN without autograd:
Conv2Dforward + backward (viaim2col/col2im)ReLU,MaxPool2D,Flatten,Linearsoftmax cross-entropy loss
SGD with momentum
The goal is clarity and correctness (not speed).
def _pad2d(x: np.ndarray, pad: int) -> np.ndarray:
if pad <= 0:
return x
return np.pad(x, ((0, 0), (0, 0), (pad, pad), (pad, pad)), mode='constant')
def im2col(x: np.ndarray, kH: int, kW: int, stride: int, pad: int) -> tuple[np.ndarray, tuple[int,int,int,int]]:
'''
x: (N, C, H, W)
Returns:
cols: (N*out_h*out_w, C*kH*kW)
out_shape: (out_h, out_w, H_padded, W_padded) for col2im
'''
N, C, H, W = x.shape
x_p = _pad2d(x, pad)
_, _, Hp, Wp = x_p.shape
out_h = (Hp - kH) // stride + 1
out_w = (Wp - kW) // stride + 1
cols = np.empty((N, C, kH, kW, out_h, out_w), dtype=x.dtype)
for i in range(kH):
i_end = i + stride * out_h
for j in range(kW):
j_end = j + stride * out_w
cols[:, :, i, j, :, :] = x_p[:, :, i:i_end:stride, j:j_end:stride]
cols = cols.transpose(0, 4, 5, 1, 2, 3).reshape(N * out_h * out_w, C * kH * kW)
return cols, (out_h, out_w, Hp, Wp)
def col2im(cols: np.ndarray, x_shape: tuple[int,int,int,int], kH: int, kW: int, stride: int, pad: int, out_shape: tuple[int,int,int,int]) -> np.ndarray:
N, C, H, W = x_shape
out_h, out_w, Hp, Wp = out_shape
cols_reshaped = cols.reshape(N, out_h, out_w, C, kH, kW).transpose(0, 3, 4, 5, 1, 2)
x_p = np.zeros((N, C, Hp, Wp), dtype=cols.dtype)
for i in range(kH):
i_end = i + stride * out_h
for j in range(kW):
j_end = j + stride * out_w
x_p[:, :, i:i_end:stride, j:j_end:stride] += cols_reshaped[:, :, i, j, :, :]
if pad <= 0:
return x_p
return x_p[:, :, pad:-pad, pad:-pad]
class Conv2D:
def __init__(self, c_in: int, c_out: int, k: int, stride: int = 1, pad: int = 0, rng: np.random.Generator | None = None):
self.c_in = c_in
self.c_out = c_out
self.k = k
self.stride = stride
self.pad = pad
rng = rng or np.random.default_rng(0)
scale = np.sqrt(2.0 / (c_in * k * k))
self.W = (rng.standard_normal((c_out, c_in, k, k)).astype(np.float32) * scale)
self.b = np.zeros((c_out,), dtype=np.float32)
self.dW = np.zeros_like(self.W)
self.db = np.zeros_like(self.b)
self._cache = None
def forward(self, x: np.ndarray) -> np.ndarray:
cols, out_shape = im2col(x, self.k, self.k, self.stride, self.pad)
W_col = self.W.reshape(self.c_out, -1)
out = cols @ W_col.T + self.b[None, :]
N, _, _, _ = x.shape
out_h, out_w, _, _ = out_shape
out = out.reshape(N, out_h, out_w, self.c_out).transpose(0, 3, 1, 2)
self._cache = (x, cols, out_shape)
return out
def backward(self, dout: np.ndarray) -> np.ndarray:
x, cols, out_shape = self._cache
dout_col = dout.transpose(0, 2, 3, 1).reshape(-1, self.c_out)
self.db[...] = dout_col.sum(axis=0)
W_col = self.W.reshape(self.c_out, -1)
self.dW[...] = (dout_col.T @ cols).reshape(self.W.shape)
dcols = dout_col @ W_col
dx = col2im(dcols, x.shape, self.k, self.k, self.stride, self.pad, out_shape)
return dx
def step(self, lr: float, vW: np.ndarray, vb: np.ndarray, weight_decay: float, momentum: float) -> tuple[np.ndarray, np.ndarray]:
vW = momentum * vW - lr * (self.dW + weight_decay * self.W)
vb = momentum * vb - lr * self.db
self.W += vW
self.b += vb
return vW, vb
class ReLU:
def __init__(self):
self._mask = None
def forward(self, x: np.ndarray) -> np.ndarray:
self._mask = x > 0
return x * self._mask
def backward(self, dout: np.ndarray) -> np.ndarray:
return dout * self._mask
class MaxPool2D:
def __init__(self, pool: int = 2, stride: int = 2):
self.pool = pool
self.stride = stride
self._cache = None
def forward(self, x: np.ndarray) -> np.ndarray:
N, C, H, W = x.shape
p = self.pool
assert H % p == 0 and W % p == 0, 'For simplicity we assume H and W divisible by pool size.'
out_h, out_w = H // p, W // p
x_reshaped = x.reshape(N, C, out_h, p, out_w, p)
out = x_reshaped.max(axis=(3, 5))
self._cache = (x, x_reshaped, out)
return out
def backward(self, dout: np.ndarray) -> np.ndarray:
x, x_reshaped, out = self._cache
_, _, out_h, p, out_w, _ = x_reshaped.shape
mask = x_reshaped == out[:, :, :, None, :, None]
denom = mask.sum(axis=(3, 5), keepdims=True)
denom = np.where(denom == 0, 1, denom)
dx_reshaped = mask * (dout[:, :, :, None, :, None] / denom)
dx = dx_reshaped.reshape(x.shape)
return dx
class Flatten:
def __init__(self):
self._shape = None
def forward(self, x: np.ndarray) -> np.ndarray:
self._shape = x.shape
return x.reshape(x.shape[0], -1)
def backward(self, dout: np.ndarray) -> np.ndarray:
return dout.reshape(self._shape)
class Linear:
def __init__(self, fan_in: int, fan_out: int, rng: np.random.Generator | None = None):
rng = rng or np.random.default_rng(0)
scale = np.sqrt(2.0 / fan_in)
self.W = (rng.standard_normal((fan_in, fan_out)).astype(np.float32) * scale)
self.b = np.zeros((fan_out,), dtype=np.float32)
self.dW = np.zeros_like(self.W)
self.db = np.zeros_like(self.b)
self._cache = None
def forward(self, x: np.ndarray) -> np.ndarray:
self._cache = x
return x @ self.W + self.b
def backward(self, dout: np.ndarray) -> np.ndarray:
x = self._cache
self.dW[...] = x.T @ dout
self.db[...] = dout.sum(axis=0)
return dout @ self.W.T
def step(self, lr: float, vW: np.ndarray, vb: np.ndarray, weight_decay: float, momentum: float) -> tuple[np.ndarray, np.ndarray]:
vW = momentum * vW - lr * (self.dW + weight_decay * self.W)
vb = momentum * vb - lr * self.db
self.W += vW
self.b += vb
return vW, vb
def softmax_cross_entropy(logits: np.ndarray, y: np.ndarray) -> tuple[float, np.ndarray]:
logits = logits.astype(np.float32)
logits = logits - logits.max(axis=1, keepdims=True)
exp = np.exp(logits)
probs = exp / exp.sum(axis=1, keepdims=True)
N = logits.shape[0]
loss = -np.log(probs[np.arange(N), y] + 1e-12).mean()
dlogits = probs
dlogits[np.arange(N), y] -= 1
dlogits /= N
return float(loss), dlogits
Optional: quick gradient check (small and slow)#
Finite-difference gradient checks are a good way to verify your Conv2D.backward().
Kept off by default.
def grad_check_conv2d():
rng_local = np.random.default_rng(0)
x = rng_local.standard_normal((2, 1, 6, 6)).astype(np.float32)
conv = Conv2D(1, 2, k=3, stride=1, pad=1, rng=rng_local)
out = conv.forward(x)
dout = rng_local.standard_normal(out.shape).astype(np.float32)
_ = conv.backward(dout)
eps = 1e-3
for _ in range(10):
oc = int(rng_local.integers(0, conv.W.shape[0]))
ic = int(rng_local.integers(0, conv.W.shape[1]))
i = int(rng_local.integers(0, conv.W.shape[2]))
j = int(rng_local.integers(0, conv.W.shape[3]))
old = conv.W[oc, ic, i, j]
conv.W[oc, ic, i, j] = old + eps
out_p = conv.forward(x)
loss_p = float((out_p * dout).sum())
conv.W[oc, ic, i, j] = old - eps
out_m = conv.forward(x)
loss_m = float((out_m * dout).sum())
conv.W[oc, ic, i, j] = old
num = (loss_p - loss_m) / (2 * eps)
ana = float(conv.dW[oc, ic, i, j])
rel_err = abs(num - ana) / (abs(num) + abs(ana) + 1e-12)
print(f'W[{oc},{ic},{i},{j}] num={num:.5f} ana={ana:.5f} rel_err={rel_err:.3e}')
if RUN_GRAD_CHECK:
grad_check_conv2d()
Train the NumPy CNN#
Architecture:
Conv(1→8, 3×3, pad=1) + ReLU + MaxPool(2×2)
Conv(8→16, 3×3, pad=1) + ReLU + MaxPool(2×2)
Flatten
Linear(16·2·2 → 10)
rng_model = np.random.default_rng(SEED)
conv1 = Conv2D(1, 8, k=3, stride=1, pad=1, rng=rng_model)
relu1 = ReLU()
pool1 = MaxPool2D(pool=2, stride=2)
conv2 = Conv2D(8, 16, k=3, stride=1, pad=1, rng=rng_model)
relu2 = ReLU()
pool2 = MaxPool2D(pool=2, stride=2)
flat = Flatten()
fc = Linear(16 * 2 * 2, 10, rng=rng_model)
# Momentum buffers
v_c1_W = np.zeros_like(conv1.W)
v_c1_b = np.zeros_like(conv1.b)
v_c2_W = np.zeros_like(conv2.W)
v_c2_b = np.zeros_like(conv2.b)
v_fc_W = np.zeros_like(fc.W)
v_fc_b = np.zeros_like(fc.b)
def forward_numpy(xb: np.ndarray) -> np.ndarray:
out = conv1.forward(xb)
out = relu1.forward(out)
out = pool1.forward(out)
out = conv2.forward(out)
out = relu2.forward(out)
out = pool2.forward(out)
out = flat.forward(out)
out = fc.forward(out)
return out
def backward_numpy(dlogits: np.ndarray) -> None:
dout = fc.backward(dlogits)
dout = flat.backward(dout)
dout = pool2.backward(dout)
dout = relu2.backward(dout)
dout = conv2.backward(dout)
dout = pool1.backward(dout)
dout = relu1.backward(dout)
_ = conv1.backward(dout)
def step_numpy(lr: float) -> None:
global v_c1_W, v_c1_b, v_c2_W, v_c2_b, v_fc_W, v_fc_b
v_c1_W, v_c1_b = conv1.step(lr, v_c1_W, v_c1_b, WEIGHT_DECAY, MOMENTUM)
v_c2_W, v_c2_b = conv2.step(lr, v_c2_W, v_c2_b, WEIGHT_DECAY, MOMENTUM)
v_fc_W, v_fc_b = fc.step(lr, v_fc_W, v_fc_b, WEIGHT_DECAY, MOMENTUM)
def predict_numpy(x: np.ndarray) -> np.ndarray:
logits = forward_numpy(x)
return logits.argmax(axis=1)
def iterate_minibatches(X_: np.ndarray, y_: np.ndarray, batch_size: int, rng_: np.random.Generator):
idx = rng_.permutation(len(X_))
for start in range(0, len(X_), batch_size):
sl = idx[start:start + batch_size]
yield X_[sl], y_[sl]
history_numpy = []
start = time.time()
for epoch in range(1, EPOCHS_NUMPY + 1):
losses = []
for xb, yb in iterate_minibatches(X_train, y_train, BATCH_SIZE, rng):
logits = forward_numpy(xb)
loss, dlogits = softmax_cross_entropy(logits, yb)
losses.append(loss)
backward_numpy(dlogits)
step_numpy(LR_NUMPY)
yhat_train = predict_numpy(X_train)
yhat_test = predict_numpy(X_test)
train_acc = accuracy_score(y_train, yhat_train)
test_acc = accuracy_score(y_test, yhat_test)
history_numpy.append({
'epoch': epoch,
'loss': float(np.mean(losses)),
'train_acc': float(train_acc),
'test_acc': float(test_acc),
})
print(f"[NumPy] epoch {epoch:02d}/{EPOCHS_NUMPY} loss={history_numpy[-1]['loss']:.4f} train_acc={train_acc:.3f} test_acc={test_acc:.3f}")
elapsed = time.time() - start
print(f"NumPy training time: {elapsed:.2f}s")
[NumPy] epoch 01/10 loss=2.0222 train_acc=0.552 test_acc=0.562
[NumPy] epoch 02/10 loss=0.7954 train_acc=0.880 test_acc=0.864
[NumPy] epoch 03/10 loss=0.2587 train_acc=0.800 test_acc=0.769
[NumPy] epoch 04/10 loss=0.3640 train_acc=0.950 test_acc=0.949
[NumPy] epoch 05/10 loss=0.1249 train_acc=0.977 test_acc=0.956
[NumPy] epoch 06/10 loss=0.0924 train_acc=0.973 test_acc=0.951
[NumPy] epoch 07/10 loss=0.0610 train_acc=0.991 test_acc=0.973
[NumPy] epoch 08/10 loss=0.0353 train_acc=0.989 test_acc=0.960
[NumPy] epoch 09/10 loss=0.0345 train_acc=0.997 test_acc=0.971
[NumPy] epoch 10/10 loss=0.0274 train_acc=0.998 test_acc=0.971
NumPy training time: 0.84s
dfn = pd.DataFrame(history_numpy)
fig = go.Figure()
fig.add_trace(go.Scatter(x=dfn['epoch'], y=dfn['loss'], mode='lines+markers', name='loss'))
fig.update_layout(title='NumPy training loss', xaxis_title='epoch', yaxis_title='cross-entropy')
fig.show()
fig = go.Figure()
fig.add_trace(go.Scatter(x=dfn['epoch'], y=dfn['train_acc'], mode='lines+markers', name='train'))
fig.add_trace(go.Scatter(x=dfn['epoch'], y=dfn['test_acc'], mode='lines+markers', name='test'))
fig.update_layout(title='NumPy accuracy', xaxis_title='epoch', yaxis_title='accuracy', yaxis=dict(range=[0, 1]))
fig.show()
4) Visual diagnostics (NumPy)#
learned filters in the first conv layer
feature maps (activations) after Conv1 + ReLU
confusion matrix and misclassifications
W1 = conv1.W[:, 0] # (8, 3, 3)
fig = px.imshow(
W1,
facet_col=0,
facet_col_wrap=4,
color_continuous_scale='RdBu',
zmin=float(W1.min()),
zmax=float(W1.max()),
title='NumPy: learned Conv1 filters (3×3)',
)
for a in fig.layout.annotations:
a.text = ''
fig.update_layout(coloraxis_showscale=False)
fig.show()
x0 = X_test[0:1]
out1 = relu1.forward(conv1.forward(x0)) # (1, 8, 8, 8)
fig = px.imshow(
out1[0],
facet_col=0,
facet_col_wrap=4,
color_continuous_scale='Viridis',
title='NumPy: feature maps after Conv1 + ReLU (one test image)',
)
for a in fig.layout.annotations:
a.text = ''
fig.update_layout(coloraxis_showscale=False)
fig.show()
yhat = predict_numpy(X_test)
cm = confusion_matrix(y_test, yhat, labels=list(range(10)))
fig = px.imshow(
cm,
text_auto=True,
color_continuous_scale='Blues',
title='NumPy: confusion matrix (test set)',
labels=dict(x='pred', y='true', color='count'),
)
fig.update_xaxes(tickmode='array', tickvals=list(range(10)))
fig.update_yaxes(tickmode='array', tickvals=list(range(10)))
fig.show()
mis = np.where(yhat != y_test)[0]
n_show = min(36, len(mis))
mis = mis[:n_show]
imgs = X_test[mis, 0]
texts = [f"true={int(y_test[i])}, pred={int(yhat[i])}" for i in mis]
fig = px.imshow(
imgs,
facet_col=0,
facet_col_wrap=9,
color_continuous_scale='gray',
title=f'NumPy: misclassified test images (first {n_show})',
)
for a, t in zip(fig.layout.annotations, texts):
a.text = t
fig.update_xaxes(showticklabels=False).update_yaxes(showticklabels=False)
fig.update_layout(coloraxis_showscale=False, margin=dict(l=10, r=10, t=60, b=10))
fig.show()
5) Practical implementation in PyTorch#
PyTorch gives you:
automatic differentiation (autograd)
optimized kernels
convenient layers (
nn.Conv2d,nn.MaxPool2d, etc.)
We train a small CNN with the same structure.
if not TORCH_AVAILABLE:
raise RuntimeError(f"PyTorch not available: {_TORCH_IMPORT_ERROR}")
# Device (suppress noisy CUDA init warnings in restricted environments)
import warnings
with warnings.catch_warnings():
warnings.filterwarnings('ignore', message='CUDA initialization:.*')
cuda_ok = bool(torch.cuda.is_available())
device = torch.device('cuda' if cuda_ok else 'cpu')
print('device:', device)
Xtr = torch.from_numpy(X_train).to(device)
ytr = torch.from_numpy(y_train).to(device)
Xte = torch.from_numpy(X_test).to(device)
yte = torch.from_numpy(y_test).to(device)
train_loader = DataLoader(TensorDataset(Xtr, ytr), batch_size=BATCH_SIZE, shuffle=True)
train_eval_loader = DataLoader(TensorDataset(Xtr, ytr), batch_size=256, shuffle=False)
test_loader = DataLoader(TensorDataset(Xte, yte), batch_size=256, shuffle=False)
class TinyCNN(nn.Module):
def __init__(self):
super().__init__()
self.features = nn.Sequential(
nn.Conv2d(1, 8, kernel_size=3, padding=1, bias=False),
nn.BatchNorm2d(8),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2),
nn.Conv2d(8, 16, kernel_size=3, padding=1, bias=False),
nn.BatchNorm2d(16),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2),
)
self.head = nn.Linear(16 * 2 * 2, 10)
def forward(self, x):
x = self.features(x)
x = x.flatten(1)
return self.head(x)
torch.manual_seed(SEED)
model = TinyCNN().to(device)
opt = torch.optim.Adam(model.parameters(), lr=LR_TORCH, weight_decay=WEIGHT_DECAY)
def eval_loader(loader):
model.eval()
ys = []
yhats = []
losses = []
with torch.no_grad():
for xb, yb in loader:
logits = model(xb)
loss = F.cross_entropy(logits, yb)
losses.append(loss.item())
ys.append(yb.detach().cpu().numpy())
yhats.append(logits.argmax(dim=1).detach().cpu().numpy())
y_all = np.concatenate(ys)
yhat_all = np.concatenate(yhats)
return float(np.mean(losses)), float(accuracy_score(y_all, yhat_all))
history_torch = []
start = time.time()
for epoch in range(1, EPOCHS_TORCH + 1):
model.train()
batch_losses = []
for xb, yb in train_loader:
opt.zero_grad(set_to_none=True)
logits = model(xb)
loss = F.cross_entropy(logits, yb)
loss.backward()
opt.step()
batch_losses.append(loss.item())
train_loss, train_acc = eval_loader(train_eval_loader)
test_loss, test_acc = eval_loader(test_loader)
history_torch.append({
'epoch': epoch,
'loss': float(np.mean(batch_losses)),
'train_acc': train_acc,
'test_acc': test_acc,
'train_loss_eval': train_loss,
'test_loss_eval': test_loss,
})
print(f"[Torch] epoch {epoch:02d}/{EPOCHS_TORCH} loss={history_torch[-1]['loss']:.4f} train_acc={train_acc:.3f} test_acc={test_acc:.3f}")
elapsed = time.time() - start
print(f"Torch training time: {elapsed:.2f}s")
device: cpu
[Torch] epoch 01/10 loss=2.1325 train_acc=0.309 test_acc=0.280
[Torch] epoch 02/10 loss=1.6937 train_acc=0.765 test_acc=0.751
[Torch] epoch 03/10 loss=1.3111 train_acc=0.841 test_acc=0.813
[Torch] epoch 04/10 loss=1.0346 train_acc=0.906 test_acc=0.873
[Torch] epoch 05/10 loss=0.8131 train_acc=0.927 test_acc=0.884
[Torch] epoch 06/10 loss=0.6370 train_acc=0.944 test_acc=0.911
[Torch] epoch 07/10 loss=0.5183 train_acc=0.952 test_acc=0.929
[Torch] epoch 08/10 loss=0.4214 train_acc=0.958 test_acc=0.936
[Torch] epoch 09/10 loss=0.3520 train_acc=0.964 test_acc=0.940
[Torch] epoch 10/10 loss=0.2969 train_acc=0.971 test_acc=0.944
Torch training time: 0.60s
dft = pd.DataFrame(history_torch)
fig = go.Figure()
fig.add_trace(go.Scatter(x=dft['epoch'], y=dft['loss'], mode='lines+markers', name='train_loss(batch)'))
fig.add_trace(go.Scatter(x=dft['epoch'], y=dft['test_loss_eval'], mode='lines+markers', name='test_loss'))
fig.update_layout(title='Torch loss', xaxis_title='epoch', yaxis_title='cross-entropy')
fig.show()
fig = go.Figure()
fig.add_trace(go.Scatter(x=dft['epoch'], y=dft['train_acc'], mode='lines+markers', name='train'))
fig.add_trace(go.Scatter(x=dft['epoch'], y=dft['test_acc'], mode='lines+markers', name='test'))
fig.update_layout(title='Torch accuracy', xaxis_title='epoch', yaxis_title='accuracy', yaxis=dict(range=[0, 1]))
fig.show()
model.eval()
with torch.no_grad():
logits = model(Xte)
yhat = logits.argmax(dim=1).detach().cpu().numpy()
cm = confusion_matrix(y_test, yhat, labels=list(range(10)))
fig = px.imshow(
cm,
text_auto=True,
color_continuous_scale='Blues',
title='Torch: confusion matrix (test set)',
labels=dict(x='pred', y='true', color='count'),
)
fig.update_xaxes(tickmode='array', tickvals=list(range(10)))
fig.update_yaxes(tickmode='array', tickvals=list(range(10)))
fig.show()
Pitfalls + exercises#
If training is unstable: lower the learning rate, check initialization, and verify shapes.
If accuracy stalls: add channels, add another conv layer, or train longer.
Exercises#
Add a third convolution layer and compare curves.
Replace MaxPool with a strided convolution.
Visualize feature maps deeper in the network.
References#
CS231n notes on CNNs